-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stream operations in the conformance client #196
Conversation
…server impl + gzip...
supportsTlsClientCerts: true | ||
supportsHalfDuplexBidiOverHttp1: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the line about client certs since this property defaults to false. An error is occurring when this code tries to instantiate a client cert from the data in the conformance request. I haven't dug into it, and decided to just omit it for now.
It turns out that we actually can't do half-duplex stuff over HTTP 1.1, because we don't actually know ahead of time, for a given bidi RPC, if it's half-duplex or full-duplex (in the framework, based on generated protobuf metadata). But the request body must be marked as "duplex" if it might be full-duplex. So have to mark the body as duplex for all bidi RPCs, and okhttp then disallows them to be used with HTTP 1.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to comment these properties out with a TODO
to know to come back to them instead of removing them? Up to you but might help identify remaining work since it sounds like there are several things to track down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think we can leave the supportsHalfDuplexBidiOverHttp1
out since there's no realistic way of changing that. But, yes, I should add back supportsTlsClientCerts
with a TODO to get them working.
val requests: RequestStream<Req> | ||
val responses: ResponseStream<Resp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed a few things in these interfaces to be property getters instead of methods since they really are lightweight properties.
@@ -46,6 +46,7 @@ abstract class ClientStreamClient<Req : MessageLite, Resp : MessageLite>( | |||
interface ClientStream<Req : MessageLite, Resp : MessageLite> { | |||
suspend fun send(req: Req) | |||
suspend fun closeAndReceive(): ResponseMessage<Resp> | |||
suspend fun cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add this in order to expose the ability to cancel a client-stream RPC. I could have also called it close
for consistency with other methods. What do you think? Should I change it?
TBH, I kind of don't like that the methods that cancel the RPC are named like "receive close" / "close response", since that's not very clear as to what they're really doing. Maybe when overhauling these interfaces (more-or-less making them look like the interfaces in the conformance client's adapt
package), we can revisit some of the names and name this operation cancel
instead of close
🤷.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now - maybe there's a better API we could put together that would hook into coroutine cancelation instead of having to expose separate methods for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is already hooked into coroutine cancellation:
But, I'm not sure I 100% understand exactly how that actually works. I don't think that alone does what we want it to do: the continuation in question for streaming simply returns the bidirectional stream object. So I think the only thing that this would cancel is the actual call to the streaming RPC method. Once it returns the stream object, that continuation is complete and the cancellation no longer does anything.
So I guess we'd have to do something similar in the app code, to tie its coroutine's cancellation to cancelling the stream (which I think means we still need a cancel
method of some sort). I think it would be challenging to somehow have the framework automatically manage coroutine lifetimes and tie them to the stream since ownership is unclear -- i.e. it's possible that one coroutine could pass the stream to another that runs on a different thread/worker, in which case it's unclear what coroutine cancellation semantics should be.
@@ -71,7 +74,8 @@ enum class Code(val codeName: String, val value: Int) { | |||
if (value == null) { | |||
return UNKNOWN | |||
} | |||
return values().first { code -> code.value == value } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would throw if the given value was not known, which seemed wrong.
@@ -175,7 +172,8 @@ private class ResponseCallback( | |||
} | |||
} | |||
|
|||
internal class PipeDuplexRequestBody( | |||
internal class PipeRequestBody( | |||
private val duplex: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was necessary in order to do client or server streaming over HTTP 1.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I expected this might be a bit more tricky - especially for client streaming calls.
supportsTlsClientCerts: true | ||
supportsHalfDuplexBidiOverHttp1: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to comment these properties out with a TODO
to know to come back to them instead of removing them? Up to you but might help identify remaining work since it sounds like there are several things to track down.
@@ -46,6 +46,7 @@ abstract class ClientStreamClient<Req : MessageLite, Resp : MessageLite>( | |||
interface ClientStream<Req : MessageLite, Resp : MessageLite> { | |||
suspend fun send(req: Req) | |||
suspend fun closeAndReceive(): ResponseMessage<Resp> | |||
suspend fun cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now - maybe there's a better API we could put together that would hook into coroutine cancelation instead of having to expose separate methods for this.
@@ -175,7 +172,8 @@ private class ResponseCallback( | |||
} | |||
} | |||
|
|||
internal class PipeDuplexRequestBody( | |||
internal class PipeRequestBody( | |||
private val duplex: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I expected this might be a bit more tricky - especially for client streaming calls.
This includes a fix to the recently observed flakiness in client streams, which was introduced (in #196). The problems are described below. The fixes are each in their own commit, so reviewing commit-by-commit might make this PR easier to read. 1. There was an issue in the use of `Result<Unit>` as the return type for `send` operations. Since the return type (`Unit`) is really a void/no-return type, calling code was never checking the result. That means that when the operation failed, it was never noticed. I'm a little surprised that we don't have linters or warnings for calls to functions that return `Result` where that return value is ignored. It turns out that this was not just a problem in my calling code, failing to check the return value for failure, but in the framework itself: the stream wrapper in `ProtocolClient` (wrapping underlying stream returned by `ConnectOkHttpClient`) was using an `onSend` callback that called the underlying stream's `send`. But the `onSend` callback simply returned `Unit` instead of `Result<Unit>`, and the method that propagated the result wasn't checking the result and throwing. I think this is the biggest commit here, and it's because I did some overhauling of `Stream`. For one, I changed it to an interface -- mainly so that we could apply a decorator pattern to it and `HTTPClientInterface` (more on that in a later PR). This makes the wrapper in `ProtocolClient` simpler -- instead of it being a full implementation, with its own atomic booleans to guard/track the close operations, it just delegates to the underlying implementation. 2. The Connect unary protocol can return a 408 status code for "canceled" and "deadline exceeded" RPC errors. But okhttp auto-retries this status code, even though the requests are not idempotent (i.e. even for POST calls). This isn't an issue with the stream conformance tests, but was noticed later after I added an extra check to the reference server so that it catches cases where a client sends an RPC for the same test case more than once. This commit adds a network interceptor to the `OkHttpClient` that will identify 408 responses that look like Connect unary responses and change their status code to 499. That is the only way I could find to prevent the retries. 3. The recently introduced flakiness in client streams is actually a rather severe issue. It was mainly observed in the new conformance suite with server streams when gzip was used, because it was all due to race conditions and the gzip operations would slow down the producer thread just enough to tickle the issue. The problem is that the `RequestBody.writeTo` function should not return before the request body is finished when the request body is not duplex. But it was calling `pipe.fold` and then immediately returning. The `fold` method swaps in a new sink in place of the read-side of the pipe and then returns, without waiting for the pipe's write side to complete. So now we use a `CountDownLatch` to wait until the writer is complete (which is signaled via a call to `close`). 4. The last issue I encountered was much less frequent, and also turned out to be a race condition. It was caused by a concurrency bug in `okio.Pipe` (square/okio#1412). Basically, some duplex operations (i.e. bidi RPCs) would infrequently timeout because, even though the stream writer had closed the pipe, the HTTP request body incorrectly remained open. I've opened a PR with a fix in the `okio` library, but I've also added a work-around for now in the code here, by using extra synchronization between the calls to `write`, `close`, and `fold`.
I've got each stream type in its own commit, to hopefully make this easier to review.
We still don't run the conformance tests for stream operations in CI because there are some flaky failures that I need to troubleshoot. In some circumstances, the reference servers are sending back unexpected errors for server stream RPCs, so I'll need to use a debugger with the reference server Go code to figure it out.
I want to dig a little bit more into the timeout issues with bidi RPCs, but I think this is a real bug in okhttp and, if so, will file an issue.